循序渐进丨MogDB Hash join实现
Join有三种实现算法:Nested Loop、Merge Join、Hash join。他们各有优缺点:
Nested Loop通常性能不好,但适用于任何类型的join;
Merge Join对预排序的数据性能非常好;
Hash join对⼤数据量通常性能最好,但是只能处理等值条件,⽽不能处理像c1 > c2这样的join条件,Hash join是处理分析性查询的重要算⼦。
Hash join是⼀个经典2阶段实现,⼀个是build phase,理想情况下对小表构建hash table,该表通常也称为inner table;第⼆个phase为probe phase,扫描关联另⼀张表的数据,并通过hash table探测是否有匹配的⾏,该表通常称为outer table。
MogDB=# explain analyze select * from t1 inner join t2 on t1.c1 = t2.c1;
QUERY PLAN
----------------------------------------------------------------------------------------------------
Hash Join (cost=270.09..2660.69 rows=11602 width=8) (actual time=10.681..83.223 rows=20016 loops=1
Hash Cond: (t1.c1 = t2.c1)
-> Seq Scan on t1 (cost=0.00..1587.05 rows=110005 width=4) (actual time=0.023..26.011 rows=110005
-> Hash (cost=145.04..145.04 rows=10004 width=4) (actual time=10.274..10.274 rows=10003 loops=1
Buckets: 32768 Batches: 1 Memory Usage: 352kB
-> Seq Scan on t2 (cost=0.00..145.04 rows=10004 width=4) (actual time=0.018..3.840 rows=10004
Total runtime: 86.694 ms
(7 rows)
tips:
当发现Hash join的性能很差时,首先要看的就是explain analyze这里的batch是不是⼤于1,即是否切磁盘了。
图片来源:https://www.cnblogs.com/flflying-tiger/p/5.html
其中单箭头是指箭头尾巴的状态在⼀定条件下会转移到箭头状态;双箭头指的是这两个状态可以互相转化,指向自身的箭头是只自身在做⼀些处理之后继续回到自身状态,此时内部会再次check是否满⾜转移条件;粗粗的⿊圈指的是终结状态,也就是说在这个状态下程序可以结束并且输出joined tuple。
第⼀个状态是HJ_BUILD_HASHTABLE节点,即为inner tuples构造hashtable;
完成了第⼀步,也就有了inner tuples构成的hashtable, 这时候我们就需要outer tuples了,所以到了HJ_NEED_NEW_OUTER节点。在这个节点我们会去获取outer tuple,如果获取不到,那么我们就要根据情况,如果是RIGHT或者FULL join则去HJ_FILL_INNER_TUPLES节点填充空tuple,要么去HJ_NEED_NEW_BATCH节点。反之,如果获取到了outer tuple的话,跳转⾄HJ_SCAN_BUCKET或者再⾛⼀HJ_NEED_NEW_OUTER(切磁盘场景如果当前tuple不属于当前batch,则写盘继续读取外表);
接下来是HJ_SCAN_BUCKET节点,到这个节点时,outer tuple都搞定了,那么我们需要在hash表中查找该外表tuple是否存在匹配的情况,如果没有则继续去HJ_FILL_OUTER_TUPLE,如果取到了并且满⾜连接条件的话,且不是ANTI join的话(ANTI join 需要丢掉inner 和outer匹配的连接),就可以到此结束返回joined tuple了。
其次是HJ_FILL_OUTER_TUPLE节点,到了这里说明inner和outer没有匹配上,那么我们应该是要获取下⼀个outer tuple,也就是说,要去HJ_NEED_NEW_OUTER,但是如果我们可以返回T1join NULL这种元组(也就是满⾜left join或者full join),也是要返回的,因为这也是我们需要的结果。所以这里多了这个状态,如果当前情况可以返回的话,那就返回。
上面说的是outer,HJ_FILL_INNER_TUPLES就是处理inner的情况。这里如果是右外连接或者全连接的话,我们是需要左表补空,这里我们就⼀直获取batch中的inner tuple。直到这个batch用完了,我们要在进⼊下⼀个batch,所以最后进⼊HJ_NEED_NEW_BATCH节点。这个节点不断地获取下⼀个batch。当我们获取完了,说明这⼀波刷完了。那么进⼊下⼀波,又到了HJ_NEED_NEW_OUTER。
Hash join中的hash表是通过拉链法实现。
key:通过每个列类型的hash函数对该列数据进⾏hash运算得到的值。
value:MinimalTuple
typedef struct Hash joinTupleData {
struct Hash joinTupleData* next; /* link to next tuple in same bucket */
uint hashvalue;/* tuple's hash code */
/*Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */
} Hash joinTupleData;
typedef struct Hash joinTableData {
int nbuckets; /* # buckets in the in-memory hash table */
/* buckets[i] is head of list of tuples in i'th in-memory bucket */
struct Hash joinTupleData** buckets;
...
} Hash joinTableData;
[Hash joinTupleData*]
[Hash joinTupleData*]->[Hash joinTupleData*]->[Hash joinTupleData*]->....
[Hash joinTupleData*]
...
/* hash表中插⼊数据 */
/* 前⾯内存⽤来记录Hash joinTupleData结构体,后⾯记录MinimalTuple */
hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
hashTuple = (Hash joinTuple)dense_alloc(hashtable, hashTupleSize);
hashTuple->hashvalue = hashvalue;
errorno = memcpy_s(HJTUPLE_MINTUPLE(hashTuple), tuple->t_len, tuple, tuple->t_len);
该状态需要确定两个值,⼀个是batch数量(是否需要切磁盘),⼀个是bucket数量(hash表的桶数)。
①首先估算内表数据所需内存空间(inner_rel_bytes)。
②估算bucket占用的空间⼤小。
③估算桶数,计算所有bucket的空间。
④顺序获取内表中的所有元组,对每⼀条元组进⾏Hash,并通过Hash结果获取块号和桶号。对于块号为0的元组,放⼈内存对应的桶内;否则放⼊为右关系每个块分别建立的临时⽂件中。此时,标记当前在内存中的块号curbatch为0。
备注:
对于⼀个Hash值为hashvalue的元组,其所属的分块号为(hashvalue/nbucket)%nbatch,其对应的桶号为hashvalue%nbucket。
为了能够使用位操作(位与和移位)实现取模和取余操作,将nbatch和nbucket取为不小于计算值的2的n次,并使得2^log2_nbuckets = nbucket,则块号的计算⽅法为(hashvalue >> log2_nbuckets)&(nbatch - 1),桶号计算式为hashvalue&(nbucket - 1)。
Hash joinTable ExecHashTableCreate(Hash* node, List* hashOperators, bool keepNulls)
{
...
/* 计算batch和bucket */
ExecChooseHashTableSize(PLAN_LOCAL_ROWS(outerNode) / SET_DOP(node->plan.dop),
outerNode->plan_width,
OidIsValid(node->skewTable),
&nbuckets,
&nbatch,
&num_skew_mcvs,
local_work_mem);
...
/* 如果batch⼤于1,说明需要切磁盘,准备临时⽂件空间 */
if (nbatch > 1) {
/*
* allocate and initialize the file arrays in hashCxt
*/
hashtable->innerBatchFile = (BufFile**)palloc0(nbatch * sizeof(BufFile*));
hashtable->outerBatchFile = (BufFile**)palloc0(nbatch * sizeof(BufFile*));
/* The files will not be opened until needed... */
/* ... but make sure we have temp tablespaces established for them */
PrepareTempTablespaces();
}
/*
* Set up for skew optimization, if possible and there's a need for more
* than one batch.(In a one-batch join, there's no point in it.)
*/
if (nbatch > 1)
ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
MemoryContextSwitchTo(oldcxt);
return hashtable;
}
/* 扫描Hash⼦节点,将数据插⼊到hash表或者临时⽂件 */
Node* MultiExecHash(HashState* node)
{
...
for (;;) {
slot = ExecProcNode(outerNode);
if (unlikely(node->cbstate)){
slot = node->cbstate->call_back(node->cbstate, slot);
}
if (TupIsNull(slot))
break;
/* We have to compute the hash value */
econtext->ecxt_innertuple = slot;
if (ExecHashGetHashValue(hashtable, econtext, hashkeys, false, hashtable->keepNulls, &hashvalue
int bucketNumber;
bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue);
if (bucketNumber != INVALID_SKEW_BUCKET_NO) {
/* It's a skew tuple, so put it into that hash table */
ExecHashSkewTableInsert(hashtable, slot, hashvalue, bucketNumber);
} else {
/* Not subject to skew optimization, so insert normally */
ExecHashTableInsert(hashtable,
slot,
hashvalue,
node->ps.plan->plan_node_id,
SET_DOP(node->ps.plan->dop),
node->ps.instrument);
}
hashtable->totalTuples += 1;
}
}
...
}
void ExecChooseHashTableSize(double ntuples, int tupwidth,bool useskew, int* numbuckets, int* numbatches int* num_skew_mcvs, int4 localWorkMem, bool vectorized, OpMemInfo* memInfo) { ... if (vectorized) { tupsize = sizeof(void*) + MAXALIGN(tupwidth); } else { /* 哈希表中单⾏tuple的占⽤空间 */ tupsize = HJTUPLE_OVERHEAD + MAXALIGN(sizeof(MinimalTupleData)) + MAXALIGN(tupwidth); } inner_rel_bytes = ntuples * tupsize; // 估算出来的内表⼤⼩,ntuples就是planner估算出来的那个该表的⾏数
hash_table_bytes = localWorkMem * 10L;
/* skew optimization */
if (useskew) {
skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100;
*num_skew_mcvs =
skew_table_bytes / (tupsize + (8 * sizeof(HashSkewBucket*)) + sizeof(int) + SKEW_BUCKET_OVERHEAD
if (*num_skew_mcvs > 0) {
hash_table_bytes -= skew_table_bytes;
}
} else {
*num_skew_mcvs = 0;
}
...
max_pointers = (localWorkMem * 10L) / hash_header_size;
max_pointers = Min(max_pointers, (long)(MaxAllocSize / hash_header_size));
/* If max_pointers isn't a power of 2, must round it down to one */
mppow2 = 1UL << my_log2(max_pointers);
if (max_pointers != mppow2) {
max_pointers = mppow2 / 2;
}
/* Also ensure we avoid integer overflow in nbatch and nbuckets */
/* (this step is redundant given the current value of MaxAllocSize) */
max_pointers = Min(max_pointers, INT_MAX / 2);
/* 因为⼀个桶就是⼀个Hash joinTupleData*指针,⾸先算出在内存允许情况下,可以得到的最⼤的数量是多少 */
/* NTUP_PER_BUCKET默认是1,之后我们⽤估算出的⾏数算⼀个桶数 */
dbuckets = ceil(ntuples / NTUP_PER_BUCKET);
dbuckets = Min(dbuckets, max_pointers);
nbuckets = (int)dbuckets;
/* don't let nbuckets be really small, though ... */
nbuckets = Max(nbuckets, MIN_HASH_BUCKET_SIZE);
/* ... and force it to be a power of 2. *
nbuckets = 1 << my_log2(nbuckets);
/*
* If there's not enough space to store the projected number of tuples and
* the required bucket headers, we will need multiple batches.
*/
bucket_bytes = ((int)hash_header_size) * nbuckets; /* 桶占⽤的空间 */
/* ⼤于work_mem,则说明纯内存不⾜以容纳下所有数据,需要切磁盘来避免out of memory */
if (inner_rel_bytes + bucket_bytes > hash_table_bytes) {
/* We'll need multiple batches */
int lbuckets;
double dbatch;
int minbatch;
double max_batch;
int bucket_size;
/*
* 估计work_mem完全⽤完时的桶数。
* 每个桶将包含⼀个桶指针和NTUP_PER_BUCKET个元组,
*/
bucket_size = ((int)tupsize * NTUP_PER_BUCKET + hash_header_size);
lbuckets = 1UL << my_log2(hash_table_bytes / bucket_size);
lbuckets = Min(lbuckets, max_pointers);
nbuckets = (int)lbuckets;
nbuckets = 1 << my_log2(nbuckets);
bucket_bytes = (int)nbuckets * hash_header_size;
/*
* Buckets are simple pointers to Hash join tuples, while tupsize
* includes the pointer, hash code, and MinimalTupleData. So buckets
* should never really exceed % of work_mem (even for
* NTUP_PER_BUCKET=1); except maybe for work_mem values that are not
* 2^N bytes, where we might get more because of doubling. So let's
* look for 50% here.
*/
Assert(bucket_bytes <= hash_table_bytes / 2);
/* Calculate required number of batches. */
dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
dbatch = Min(dbatch, max_pointers);
minbatch = (int)dbatch;
nbatch = 2;
while (nbatch < minbatch) {
nbatch <<= 1;
}
/*
* This Min() steps limit the nbatch so that the pointer arrays
* we'll try to allocate do not exceed MaxAllocSize.
*/
max_batch = (MaxAllocSize + 1) / sizeof(BufFile*) / 2;
nbatch = (int)Min(nbatch, max_batch);
}
Assert(nbuckets > 0);
Assert(nbatch > 0);
*numbuckets = nbuckets;
*numbatches = nbatch;
...
}
①从外表中获取元组,进⾏Hash,获取元组块号和桶号。当块号等于当前在内存中的块号时,直接扫描对应的桶,找寻满⾜条件的元组并进⾏连接;否则,将其存到为外表每个块建立的临时⽂件中,直到外表被扫描完毕。
②当前batch结束后,从内表的块号curbatch + 1对应的临时⽂件中读取所有元组将其Hash到相应的桶内,并将curbatch加1。
③从外表的块号curbatch对应的临时⽂件中依次读取所存储的元组,计算其桶号,并扫描桶中内表的元组,寻找满⾜连接条件的元组进⾏连接。
④重复步骤2和3,直到所有的块都被扫描结束。
MogDB Hash join 三种实现方法
1. 内存版本的In Memory Hash join算法
for each row R1 in the build table
begin
calculate hash value on R1 join key(s)
insert R1 into the appropriate hash bucket
end
for each row R2 in the probe table
begin
calculate hash value on R2 join key(s)
for each row R1 in the corresponding hash bucket
if R1 joins with R2
return (R1, R2)
end
内存版本的实现看起来非常直观,却没有考虑⼀个问题:如果inner table 太⼤不能放到内存中怎么办?这就引出了Hash join的第⼆个实现版本。
2.内存不⾜后Hybrid Hash join算法
Hybrid Hash join 是经典的解决内存不⾜情况下的Hash join算法,把inner table和outer table按照关联键分成多个分区(这个分区数就是创建hashtable的时候算出来的batch数),每个分区的数据保存到磁盘上,对每个分区应用第⼀个版本的 Hash join 算法。每个分区是⼀个 batch(⼀次批处理)。这里有⼀个优化就是第⼀个batch不比写⼊磁盘,可以避免第⼀个batch的磁盘IO。
Hybrid Hash join实现流程:
首先对inner table分batch,根据前面计算的算法计算batchno,如果tuple属于batch 0,则加⼊内存中的hashtable中,否则则写⼊该batch对应的磁盘⽂件中。batch 0不用写⼊磁盘⽂件中。
对outer table进⾏分batch,如果outer table 的tuple计算后属于batch 0,则执⾏前面提到的Hash join算法:判断hashtable中是否存在匹配该outer tuple的inner tuple,如果存在且满⾜所有where条件,则找到了⼀个匹配,输出结果,否则继续下⼀个tuple。如果outer tuple不属于batch 0,则将该tuple写⼊该batch对应的外表磁盘⽂件中。
以上图片来源:https://cn.greenplum.org/Hash join-in-pggp/
Outer table扫描结束时, batch 0也处理完了。继续处理 batch 1:加载 batch 1 的inner table临时数据到内存中,构建hashtable,扫描外表对应的batch 1⽂件,执⾏前面的probe操作。完成batch 1 后,继续处理 batch 2,直到完成所有的batches。当然这里还是可能出现某⼀个batch的数据还是⽆法在内存中全部容纳下,这时候就继续把batch数目从n变成2n,重新扫描并计算内表的该batch数据,如果仍然属于该batch,则保留在内存中,否则写⼊到⽂件中。由于 batch 数目发⽣了变化,那么有些batch里面的tuple可能会不在属于当前batch。Hybrid Hash join 算法(取模操作)确保,batch 数目翻倍后,tuple所属batch只会向后,⽽不会向前。
Hybrid Hash join可以解决内存的问题,但MogDB的Hash join在Hybrid Hash join基础上还做了⼀个优化,这就是另外⼀个版本Skew Optimization。
3.Hybrid Hash join优化版
1. 连接条件必须只有1个
2. 连接条件必须是OpExpr类型
3. 连接条件左参数必须是普通列引用类型
if (list_length(hashclauses) == 1) {
OpExpr* clause = (OpExpr*)linitial(hashclauses);
Node* node = NULL;
Assert(is_opclause(clause));
node = (Node*)linitial(clause->args);
if (IsA(node, RelabelType))
node = (Node*)((RelabelType*)node)->arg;
if (IsA(node, Var)) {
Var* var = (Var*)node;
RangeTblEntry* rte = NULL;
rte = root->simple_rte_array[var->varno];
if (rte->rtekind == RTE_RELATION) {
skewTable = rte->relid;
skewColumn = var->varattno;
skewInherit = rte->inh;
skewColType = var->vartype;
skewColTypmod = var->vartypmod;
}
}
}
Skew优化的核⼼思想是尽量避免磁盘IO:在batch 0阶段处理outer table最常见的 (MCV,Mostcommon value) 数据。选outer table的MCV⽽不是inner table的MCV的原因是:优化器通常选择小表和正态分布的表做inner table,这样outer table会更⼤,或者更⼤概率是非正态分布。
首先是准备 skew hash table,包括三个步骤:
确定 skew hash table 的⼤小。MogDB 默认分配2% 的内存构建skew hash table,并计算能容纳多少MCV tuples。
/* skew optimization */
if (useskew) {
skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100; // SKEW_WORK_MEM_PERCENT为2
/* ----------
* Divisor is:
* size of a hash tuple +
* worst-case size of skewBucket[] per MCV +
* size of skewBucketNums[] entry +
* size of skew bucket struct itself
* ----------
*/
// hashtable->skewBucket的指针数量将是允许的mcv数量的8倍
*num_skew_mcvs =
skew_table_bytes / (tupsize + (8 * sizeof(HashSkewBucket*)) + sizeof(int) + SKEW_BUCKET_OVERHEAD
if (*num_skew_mcvs > 0) {
hash_table_bytes -= skew_table_bytes;
}
} else {
*num_skew_mcvs = 0;
}
根据pg_statistic syscache数据,获得outer table的MCV统计信息,对每个MCV,计算其hash值,并放到其对应skew hash bucket中,现在还没有处理inner table,所以该bucket指向NULL。Hash冲突解决⽅案是线性增长,如果当前slot被占用了,则占用下⼀个。
这是SkewHash表和上面内存版本的区别,它使用的是开放地址的⽅式实现的。
static void ExecHashBuildSkewHash(Hash joinTable hashtable, Hash* node, int mcvsToUse)
{
/*
* Try to find the MCV statistics for the outer relation's join key.
*
* Note: We don't consider multi-column skew-optimization values here(improve later)
*/
statsTuple = SearchSysCache4(STATRELKINDATTINH,
ObjectIdGetDatum(node->skewTable),
CharGetDatum(stakind),
Int16GetDatum(node->skewColumn),12
BoolGetDatum(node->skewInherit));
if (!HeapTupleIsValid(statsTuple)) {
return;
}
if (get_attstatsslot(statsTuple,
node->skewColType,
node->skewColTypmod,
STATISTIC_KIND_MCV,
InvalidOid,
NULL,
&values,
&nvalues,
&numbers,
&nnumbers)) {
double frac;
int nbuckets;
FmgrInfo* hashfunctions = NULL;
int i;
if (mcvsToUse > nvalues) {
mcvsToUse = nvalues;
}
/*
* Calculate the expected fraction of outer relation that will
* participate in the skew optimization. If this isn't at least
* SKEW_MIN_OUTER_FRACTION, don't use skew optimization.
*/
frac = 0;
for (i = 0; i < mcvsToUse; i++) {
frac += numbers[i];
}
if (frac < SKEW_MIN_OUTER_FRACTION) {
free_attstatsslot(node->skewColType, values, nvalues, numbers, nnumbers);
ReleaseSysCache(statsTuple);
return;
}
}
...
for (i = 0; i < mcvsToUse; i++) {
uint hashvalue;
int bucket;
hashvalue = DatumGetUInt(FunctionCall1(&hashfunctions[0], values[i]));
/*
* While we have not hit a hole in the hashtable and have not hit
* the desired bucket, we have collided with some previous hash
* value, so try the next bucket location.NB: this code must
* match ExecHashGetSkewBucket.
*/
bucket = hashvalue & (nbuckets - 1);
while (hashtable->skewBucket[bucket] != NULL && hashtable->skewBucket[bucket]->hashvalue 66
bucket = (bucket + 1) & (nbuckets - 1);
}
/*
* If we found an existing bucket with the same hashvalue, leave
* it alone. It's okay for two MCVs to share a hashvalue.
*/
if (hashtable->skewBucket[bucket] != NULL) {
continue;
}
/* Okay, create a new skew bucket for this hashvalue. */
hashtable->skewBucket[bucket] =
(HashSkewBucket*)MemoryContextAlloc(hashtable->batchCxt, sizeof(HashSkewBucket));
hashtable->skewBucket[bucket]->hashvalue = hashvalue;
hashtable->skewBucket[bucket]->tuples = NULL;
hashtable->skewBucketNums[hashtable->nSkewBuckets] = bucket;
hashtable->nSkewBuckets++;
hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
if (hashtable->spaceUsed > hashtable->spacePeak) {
hashtable->spacePeak = hashtable->spaceUsed;
}
}
...
}
...
填充skew hash table:扫描inner table 构建main hashtable 时,如果当前tuple 属于skew hash table,则加⼊到skew hashtable 。
static void ExecHashSkewTableInsert(Hash joinTable hashtable, TupleTableSlot* slot, uint hashvalue,
{
...
}
扫描outer table 时,如果是 MCV tuple,则使用skew hash table进⾏处理。 否则按照前面介绍的Hybrid Hash join 算法处理。假设使用skew优化,⼤部分常见的数据就会在batch 0处理,这样就节约了⼤部分的磁盘IO。
Hash join算法是通过ExecHash join函数实现的。
TupleTableSlot* ExecHash join(Hash joinState* node)
{
...
/*
* run the Hash join state machine
*/
for (;;) {
switch (node->hj_JoinState) {
case HJ_BUILD_HASHTABLE: {
...
oldcxt = MemoryContextSwitchTo(hashNode->ps.nodeContext);
/* 创建hash表,计算bucket和batch数,如果需要切磁盘,初始化磁盘空间,如果skew优化成⽴,初始化skew哈希表 */
hashtable = ExecHashTableCreate((Hash*)hashNode->ps.plan, node->hj_HashOperators,
HJ_FILL_INNER(node) || node->js.nulleqqual != NIL);
MemoryContextSwitchTo(oldcxt);
node->hj_HashTable = hashtable;
/* Hash算⼦,遍历内表插⼊到hash表或者插⼊到临时⽂件batch中 */
(void)MultiExecProcNode((PlanState*)hashNode);
/*
* need to remember whether nbatch has increased since we
* began scanning the outer relation
*/
hashtable->nbatch_outstart = hashtable->nbatch;
node->hj_JoinState = HJ_NEED_NEW_OUTER;
}
/* fall through */
case HJ_NEED_NEW_OUTER:
/*
* We don't have an outer tuple, try to get the next one
*/
outerTupleSlot = ExecHash joinOuterGetTuple(outerNode, node, &hashvalue);
if (TupIsNull(outerTupleSlot)) {
/* end of batch, or maybe whole join */
if (HJ_FILL_INNER(node)) {
/* set up to scan for unmatched inner tuples */
/* 右外或者全外连接,补空 */
ExecPrepHashTableForUnmatched(node);
node->hj_JoinState = HJ_FILL_INNER_TUPLES;
} else
node->hj_JoinState = HJ_NEED_NEW_BATCH; // 下⼀批次数据
continue;
}
econtext->ecxt_outertuple = outerTupleSlot;
node->hj_MatchedOuter = false;
/*
* Find the corresponding bucket for this tuple in the main
* hash table or skew hash table.
*/
node->hj_CurHashValue = hashvalue;
ExecHashGetBucketAndBatch(hashtable, hashvalue, &node->hj_CurBucketNo, &batchno);
node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable, hashvalue);
node->hj_CurTuple = NULL;
/*
* The tuple might not belong to the current batch (where
* "current batch" includes the skew buckets if any).
*/
if (batchno != hashtable->curbatch && node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO
/*
* Need to postpone this outer tuple to a later batch. * Save it in the corresponding outer-batch file.
*/
Assert(batchno > hashtable->curbatch);
MinimalTuple tuple = ExecFetchSlotMinimalTuple(outerTupleSlot);
ExecHash joinSaveTuple(tuple, hashvalue, &hashtable->outerBatchFile[batchno]);
*hashtable->spill_size += sizeof(uint) + tuple->t_len;
pgstat_increase_session_spill_size(sizeof(uint) + tuple->t_len);
/* Loop around, staying in HJ_NEED_NEW_OUTER state */
continue;
}
/* OK, let's scan the bucket for matches */
node->hj_JoinState = HJ_SCAN_BUCKET;
/* Prepare for the clear-process if necessary */
if (jointype == join_RIGHT_ANTI || jointype == join_RIGHT_SEMI)
node->hj_PreTuple = NULL;
/* fall through */
case HJ_SCAN_BUCKET:
/*
* We check for interrupts here because this corresponds to
* where we'd fetch a row from a child plan node in other join
* types.
*/
CHECK_FOR_INTERRUPTS();
/* 是否可以在对应bucket中匹配到 */
if (!ExecScanHashBucket(node, econtext)) {
/* out of matches; check for possible outer-join fill */
node->hj_JoinState = HJ_FILL_OUTER_TUPLE
continue;
}
/* 匹配成功,判断是否可以满⾜连接条件 */
if (joinqual == NIL || ExecQual(joinqual, econtext, false)) {
node->hj_MatchedOuter = true;
/*
* for right-anti join: skip and delete the matched tuple;
* for right-semi join: return and delete the matched tuple;
* for right-anti-full join: skip and delete the matched tuple;
*/
if (jointype == join_RIGHT_ANTI || jointype == join_RIGHT_SEMI ||
jointype == join_RIGHT_ANTI_FULL) {
if (node->hj_PreTuple)
node->hj_PreTuple->next = node->hj_CurTuple->next;
else if (node->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO)
hashtable->skewBucket[node->hj_CurSkewBucketNo]->tuples = node->hj_CurTuple
else
hashtable->buckets[node->hj_CurBucketNo] = node->hj_CurTuple->next;
if (jointype == join_RIGHT_ANTI || jointype == join_RIGHT_ANTI_FULL)
continue;
} else {
HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
/* Anti join: we never return a matched tuple */
if (jointype == join_ANTI || jointype == join_LEFT_ANTI_FULL) {
node->hj_JoinState = HJ_NEED_NEW_OUTER;
continue;
}
/* Semi join: we'll consider returning the first match, but after
* that we're done with this outer tuple */
if (jointype == join_SEMI)
node->hj_JoinState = HJ_NEED_NEW_OUTER;
}
if (otherqual == NIL || ExecQual(otherqual, econtext, false)) {
TupleTableSlot* result = NULL;
result = ExecProject(node->js.ps.ps_ProjInfo, &isDone);
if (isDone != ExprEndResult) {
node->js.ps.ps_TupFromTlist = (isDone == ExprMultipleResult);
return result;
}
} else
InstrCountFiltered2(node, 1);
} else {
InstrCountFiltered1(node, 1);
/* For right Semi/Anti join, we set hj_PreTuple following hj_CurTuple */
if (jointype == join_RIGHT_ANTI || jointype == join_RIGHT_SEMI)
node->hj_PreTuple = node->hj_CurTuple;
}
break;
case HJ_FILL_OUTER_TUPLE:
/* 左外或者全外连接,补空 */
/*
* The current outer tuple has run out of matches, so check
* whether to emit a dummy outer-join tuple. Whether we emit
* one or not, the next state is NEED_NEW_OUTER.
*/
node->hj_JoinState = HJ_NEED_NEW_OUTER;
if (!node->hj_MatchedOuter && HJ_FILL_OUTER(node)) {
/*
* Generate a fake join tuple with nulls for the inner
* tuple, and return it if it passes the non-join quals.
*/
econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
if (otherqual == NIL || ExecQual(otherqual, econtext, false)) {
TupleTableSlot* result = NULL;
result = ExecProject(node->js.ps.ps_ProjInfo, &isDone);
if (isDone != ExprEndResult) {
node->js.ps.ps_TupFromTlist = (isDone == ExprMultipleResult);
return result;
}
} else
InstrCountFiltered2(node, 1);
}
break;
case HJ_FILL_INNER_TUPLES:
/* 右外或者全外连接,补空 */
/*
* We have finished a batch, but we are doing right/full/rightAnti join,
* so any unmatched inner tuples in the hashtable have to be
* emitted before we continue to the next batch.
*/
if (!ExecScanHashTableForUnmatched(node, econtext)) {
/* no more unmatched tuples */
node->hj_JoinState = HJ_NEED_NEW_BATCH;
continue;
}
/*
* Generate a fake join tuple with nulls for the outer tuple,
* and return it if it passes the non-join quals.
*/
econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
if (otherqual == NIL || ExecQual(otherqual, econtext, false)) {
TupleTableSlot* result = NULL;
result = ExecProject(node->js.ps.ps_ProjInfo, &isDone);
if (isDone != ExprEndResult) { node->js.ps.ps_TupFromTlist = (isDone == ExprMultipleResult);
return result;
}
} else
InstrCountFiltered2(node, 1);
break;
case HJ_NEED_NEW_BATCH:
/* 或者下⼀批次数据 */
/*
* Try to advance to next batch. Done if there are no more.
*/
if (!ExecHash joinNewBatch(node)) {
ExecEarlyFree(outerPlanState(node));
EARLY_FREE_LOG(elog(LOG, "Early Free: Hash join Probe is done"
" at node %d, memory used %d MB.",
(node->js.ps.plan)->plan_node_id, getSessionMemoryUsageMB()));2
return NULL; /* end of join */
}
node->hj_JoinState = HJ_NEED_NEW_OUTER;
break;
default:
ereport(ERROR, (errcode(ERRCODE_UNEXPECTED_NODE_STATE),
errmodule(MOD_EXECUTOR), errmsg("unrecognized Hash join state: %d", (int)node
}
}
}
关于并⾏
并⾏情况下每个线程都会有自⼰的hash表,也就是我这里并⾏度是2,就会创建2个hash表,对两个scan节点根据joinkey进⾏数据重分布,保证相同的joinkey可以hash到同⼀个线程上面。
参考链接:https://cn.greenplum.org/Hash join-in-pggp/
数据驱动,成就未来,云和恩墨,不负所托!
云和恩墨创立于2011年,以“数据驱动,成就未来”为使命,是智能的数据技术提供商。我们致力于将数据技术带给每个行业、每个组织、每个人,构建数据驱动的智能未来。
云和恩墨在数据承载(分布式存储、数据持续保护)、管理(数据库软件、数据库云管平台、数据技术服务)、加工(应用开发质量管控、数据模型管控、数字化转型咨询)和应用(数据服务化管理平台、数据智能、隐私计算数据联邦平台)等领域为各个组织提供可信赖的产品、服务和解决方案,围绕用户需求,持续为客户创造价值,激发数据潜能,为成就未来敏捷高效的数字世界而不懈努力。